package com.harmonycloud;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

public class FpGrowth {
    public static void main(String[] args) {

        double minSupport = 0.005;//最小支持度
        int numPartition = 8;  //数据分区
        double minConfidence = 0.6;//最小置信度
        if (args.length < 1) {
            System.out.println("<input data_path>");
            System.exit(-1);
        }
        String bpc_path = args[0];
        if (args.length >= 2)
            minSupport = Double.parseDouble(args[1]);
        if (args.length >= 3)
            numPartition = Integer.parseInt(args[2]);
        if (args.length >= 4)
            minConfidence = Double.parseDouble(args[3]);
        int timesize = Integer.parseInt(args[4]);
        Integer appmodelId = Integer.parseInt(args[5]);

        SparkConf conf = new SparkConf().setAppName("FPDemo");////修改的地方
        conf.set("spark.driver.allowMultipleContexts","true");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Alarm> filedatabpc = ReadCsvFile.getAlarmFile(bpc_path);
        DbStoreHelper dbStoreHelper = new DbStoreHelper();
        List<Alarm> filedata = null;
        try {
            filedata = dbStoreHelper.selectByAppmodelId(appmodelId);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        filedata.addAll(filedatabpc);
        Collections.sort(filedata, Comparator.comparing(Alarm::getHappendTime));
        List<List<String>> preData = ReadCsvFile.getAlarmPreData(filedata, timesize);

        JavaRDD<List<String>> transactions = sc.parallelize(preData);
        FPGrowth fpGrowth = new FPGrowth().setMinSupport(minSupport).setNumPartitions(numPartition);
        FPGrowthModel<String> model = fpGrowth.run(transactions);//执行算法

        List<Rule> ruleList = new ArrayList<>();
        for (AssociationRules.Rule<String> rule : model.generateAssociationRules(minConfidence).toJavaRDD().collect()) {
            Rule ginimodel = new Rule(appmodelId, rule.javaAntecedent() + "", rule.javaConsequent() + "", rule.confidence() + "");
            System.out.println(rule.javaAntecedent() + "=>" + rule.javaConsequent() + ", " + rule.confidence());
            ruleList.add(ginimodel);
        }
        DbStoreHelper.insertList(ruleList);

        sc.stop();
    }
}

